Spark 两个RDD按key合并(join算子和cogroup算子) 您所在的位置:网站首页 join和join in后面分别加什么 Spark 两个RDD按key合并(join算子和cogroup算子)

Spark 两个RDD按key合并(join算子和cogroup算子)

2023-06-17 11:35| 来源: 网络整理| 查看: 265

在工作中经常遇到需要合并RDD的情况,记录下处理情况。join和cogroup算子都能达到要求,按key合并,只是当rdd存在多个相同的key时候,最终的输出结果不一样。网上找到了处理情况,自己也测试了,代码如下:

object Test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test"). master("local[2]").getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") /** * id name * 1 zhangsan * 2 lisi * 3 wangwu */ val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"))) /** * id age * 1 30 * 2 29 * 4 21 */ val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21))) println("cogroup") /** * (1,(CompactBuffer(zhangsan),CompactBuffer(30))) * (2,(CompactBuffer(lisi),CompactBuffer(29))) * (3,(CompactBuffer(wangwu),CompactBuffer())) * (4,(CompactBuffer(),CompactBuffer(21))) */ idName.cogroup(idAge).collect().foreach(println) println("join") // fullOuterJoin于cogroup的结果类似, 只是数据结构不一样 /** * (1,(Some(zhangsan),Some(30))) * (2,(Some(lisi),Some(29))) * (3,(Some(wangwu),None)) * (4,(None,Some(21))) */ idName.fullOuterJoin(idAge).collect().foreach(println) /** * id score * 1 100 * 2 90 * 2 95 */ val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95))) println("cogroup, 出现相同id时") /** * (1,(CompactBuffer(zhangsan),CompactBuffer(100))) * (2,(CompactBuffer(lisi),CompactBuffer(90, 95))) * (3,(CompactBuffer(wangwu),CompactBuffer())) */ idName.cogroup(idScore).collect().foreach(println) println("join, 出现相同id时") /** * (1,(Some(zhangsan),Some(100))) * (2,(Some(lisi),Some(90))) * (2,(Some(lisi),Some(95))) * (3,(Some(wangwu),None)) */ idName.fullOuterJoin(idScore).collect().foreach(println) } }

参考:https://blog.csdn.net/wo334499/article/details/51689563



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有